本文主要从使用层面介绍Flink独树一帜的设计, 帮助大家理解并更好的利用这些特性
Apache Flink的定义
Apache Flink是在无边界和有边界的数据流上进行有状态计算的框架.
同行对比
面对已经在实时领域耕耘了许久的两位老前辈: Storm和Spark Streaming, Flink有什么优势能够脱颖而出呢?
Spark Streaming
攒一段数据再计算, 本质还是批处理, 涉及到shuffle还是会有落盘, fetch和merge等操作. API用起来很流畅, 但是不适合维护State. Spark Streaming更适合做etl.Storm
Flink API跟Spark Streaming很像,但编程模型跟Storm更为接近。Storm自身不支持State,需要用户自己来维护,为了高可用和故障恢复, 用户通常会选择Redis做缓存,不过也由此也增加了对外部系统的依赖,同时也带来了额外的开销。
由此可见, Storm和Spark Streaming在State的处理上都有些力不从心, Flink与之相比最大的优势就是提供了完善的State的支持.
State
每个重要的流应用程序都是有状态的, 只有少数仅对事件做转换, 并且事件彼此独立的应用程序不需要状态. 任何运行基本业务逻辑的程序都需要记住事件或中间结果, 以便在后续的处理中访问它们.
从Flink在state处理上下文中提供的所有特性可以看到, state是Flink里的一等公民
.
多个state原型
flink为不同的数据结构提供了state原型, 比如原子value、list或map. 开发者可以结合业务逻辑选择合适的state原型.可插拔的state Backend
不同的后端state存放的位置不一样, 比如你可以配置RocksDBStateBackend
, flink会把state保存在RocksDB
中. RocksDB是一个高效的嵌入式磁盘KV数据库, 类似于HBASE
Exactly-once一致性状态保证
flink的检查点与故障恢复机制保证了应用程序的state在崩溃时的一致性, 因此失败是透明的, 不会影响应用的准确性.支持超大state
flink基于异步和增量的算法, 可以支持TB级别的state应用可拓展
flink为应用程序的拓展提供了强大的兼容性支持, 不论业务逻辑修改还是并行度变更, 你可以从容的应对业务的迭代
关键特性
event-time与watermark
flink支持3种时间模型:
Processing time
指数据进入到操作符的系统时间. 是最简单的时间概念, 不需要在流与机器之间协调. 它可以提供最好的性能和最低的延迟. 由于processing time是运行时指定的, 因此程序在这种时间模型下, 每次执行的结果都不一样.Event time
是记录实际产生的时间. 在进入flink之前就已经内嵌在记录当中. 在event-time, 程序处理的进度有数据来决定, 跟机器时钟没有关系. event-time程序必须定义如何产生水位线watermark
, 水位线可以描述程序的进度. event-time可以反映客观事实, 基于事件时间的程序的计算结果是最准确的, 并且每次执行结果都不变.Ingestion time
数据进入flink的时间, source操作符用其当前时间作为每个记录的时间, 基于事件的操作符(比如window)都会引用这个时间.
watermark
watermark是flink用来评估程序进度的机制. 在实际环境中数据不可避免的会产生乱序, watermark就是用来告诉flink低于它的记录都已经处理完毕了.
watermark作为stream的一部分随着记录在stream中流动, 它的本质就是一个时间戳. 在乱序的流中, watermark意味着早于它的记录都已经到达flink. 如下图
有的操作符消费多条输入流, 例如union
, join
, 或者跟在keyBy()
或partition()
函数后面的操作符, 这些操作符的当前事件时间就是它的输入流中最小的事件时间.
如何理解watermark呢
假设一个场景, flink收到的数据绝大多数都是顺序的, 偶尔有几个乱序, 但最多不会超过1min. 即00:00产生的记录最迟01:00可以被flink消费到.
反过来讲, 当前时刻01:00, flink可以拿到最早00:00产生的记录; 02:00拿到最早01:00产生的记录…以此类推. 那么相对于当前时间T
, 水位线就是T-1
. 即当前时间减去1分钟, 在这之前的记录都已处理完毕, 不会再有更早的记录出现.
这是建立在消费紧跟着当前时间来模拟的. 对应到event-time, “当前时间”就可以认为是当前接收到记录的最大事件时间, 上面的公式就变成: max(T) - 1
如何生成watermark呢
接上述例子, 数据有1分钟的乱序
1 | env.addSource(...) |
window
窗口是处理无界流的心脏. 窗口将stream切分成了有宽度的”Buckets”, 在这之上我们可以进行计算.
1 | stream |
篇幅原因, 我们主要讨论keyed window, 以及其中keyBy, window, trigger和function api
window的生命周期
以[12:00, 12:05)的窗口为例, 属于这个窗口的第一个元素到达window操作符时, 这个窗口被创建. 当水位线越过12:06(水位线始终比当前最大事件时间小1分钟)时, 窗口被完全删除
window function
flink有两类window function
可累加的函数
reduce, aggregate, fold就属于这类函数. 它们每收到一条记录就进行计算, 记录不在内存中停留, 只会保留聚合结果. 这种函数由于是增量计算, 且不会存储大量数据, 所以效率很高. 但是能够处理的场景有限, 比如uv没法处理, 另外这类函数拿不到窗口的上下文来访问时间与state信息.
1 | DataStream<Tuple2<String, Long>> input = ...; |
ProcessWindowFunction
可以拿到包含了窗口所有记录的Iterable对象, 和一个包含了Time与State信息的Context对象, 这使得它提供了比其它函数更具灵活性. 不过这是以牺牲了性能与资源消耗为代价, 因为它无法增量计算, window的元素都缓存在内存中.
1 | DataStream<Tuple2<String, Long>> input = ...; |
触发器
触发器决定了窗口函数何时执行, flink用4个常量来描述触发时的行为:
CONTINUE
: 什么都不做FIRE
: 触发计算PURGE
: 清空窗口里的元素FIRE_AND_PURGE
: 触发计算并在之后清空窗口里的元素
默认情况下窗口会在watermark越过窗口的结束时间时调用。比如5min宽度的窗口[12:00,12:05),1min乱序的watermark,会在12:06结束时触发计算。
基于processing-time的程序由于没有watermark,所以会在12:05结束时触发
用户可能不愿意等到6min之后才看到结果,最好是每30s计算一个结果。对于event-time的程序,flink在不破坏watermark机制的前提下用Pane巧妙的解决了这个问题。
基于processing-time的程序无法做到在窗口期内触发计算
Pane是flink根据触发间隔(本例中是30s)而在逻辑上划分的数据块(block),即一个window会划分成n个pane,n = window_size / pane_size。当水位线越过pane的end_tine时触发计算,计算的数据集就是pane里的数据。
注意
如果窗口函数是ProccessFunction,则窗口里的数据默认会在窗口销毁时清空。换句话说pane会包含之前pane的数据,这部分数据如何处理呢?flink提供了触发器的包装类PurgingTrigger,它会在每次pane触发计算后清空窗口的数据。
1 | source |
checkpoint
Flink提供了一种容错机制,可以恢复应用程序的state到最近的一致性状态。该机制可确保即使出现故障,程序的状态最终也将以Exactly-Once
的语义回溯数据流中的每条记录。这个机制就是Checkpoint.
当应用程序失败时, flink会停止数据流, 系统会重启程序并将操作符重置到最近一次成功的checkpoint. 重启的程序处理的所有记录都会保证不与前一次checkpoint里的数据重复.
flink分布式快照的核心是stream栅栏(Barriers). 这些Barrier从source开始注入到数据流中, 作为流的一部分随着记录一起发送到下游.
如图barriers将数据流分割成了若干部分, 每个barrier前面的记录进入到当前的快照, 之后的记录会进入下一次快照. Barriers不会打断数据流因此它非常轻量.
不同快照的barriers可以同时出现在数据流中, 这意味着多个快照可能会并发执行.
Barrier从source开始注入到流中, 当中间的操作符收到上游发送过来的全部barrier, 它在完成state快照后会把barrier发送给它所有的下游stream. 一旦sink收到了它所有上游stream的barrier, 会向checkpoint协调器发送快照完毕的ack消息. 所有的sink发送ack后快照就认为执行成功了. 快照的成功标志着属于当前快照的所有记录(包含产生的所有子孙记录)都已经通过了完整的数据流拓扑.
- 一旦操作符接收到了上游某一个stream的barrier, 则在收到其它stream的barrier之前都不会再处理这个stream的数据. 否则它会把分属于两个快照的数据混合到一起
- 在等待其它stream的barrier的同时, 操作符会将来自这个stream的数据缓存下来
- 当收到最后一个barrier, 操作符在完成自身的输出后把自己的barrier也发送出去
- 最后, 它开始恢复处理记录, 先处理缓存中的, 然后是来自stream的
注意
align buffer
是flinkExactly-Once
语义的保证, 只有在当前快照的记录都处理完了才会处理下一份快照的数据. 如果启用的是At-Least-Once
语义, 则操作符会跳过align, 持续处理记录. 这样记录可能会在多份快照中重复.
flink默认开启Exactly-Once, 手动开启的方法
1 | CheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); |
state
backend
flink提供了3个开箱即用的backend:
MemoryStateBackend
将数据以Objects
的形式存放到Java Heap, Key/Value state和window操作符用Hash表来存放values, 触发器等等. checkpoint触发时, 会将state的快照作为ack
的一部分发送到JobManager
, JobManager将它们存放到自己的堆内存中. 适合在本地开发和调试使用.FsStateBackend
需要配置一个文件系统的url, 比如”hdfs://namenode:8020/flink/checkpoints”或”file:///data/flink/checkpoints”. FsStateBackend同样是将state存放到内存中, checkpoint发生时将快照写到配置的文件系统中. 由于state存放在内存, 对其的访问都非常高效, 序列化只会在checkpoint时发生. 这个Backend的大小受限于JVM, 有OutOfMemorys
的风险.RocksDBStateBackend
也需要配置一个文件系统url, RocksDBStateBackend将state存放到RocksDB数据库中, RocksDB把数据文件存放到TaskManager的本地目录中. checkpoint触发时, RocksDBStateBackend将快照存放到配置的文件系统中. 这个Backend支持的state大小受限于可用的磁盘空间.RocksDBStateBackend是唯一一个支持增量checkpoint的Backend
与FsStateBackend不同, RocksDBStateBackend每次访问state都要经过序列化/反序列化, 因此会有一定的性能损耗.
如何选择合适的backend呢
- 如果数据量不大, 足以放入内存, 那么使用
FsStateBackend
可以获得更高的性能. 否则的话推荐使用RocksDBStateBackend
这个数据量的边界可以参考现有实时看板的DAU, 处理最高150w的用户id绰绰有余.
State Time-To-Live (TTL)
flink提供了ttl机制, 所有的集合类型的state都支持记录级别的ttl, 这意味着list元素或map记录可以单独过期.
ttl在RocksDBStateBackend时非常有用. 因为手动删除(state.clear())只是标记删除, state的体积不会减少, 只有在下次访问时才会真正删除. 所以RocksDBStateBackend的state会越来越大, 对于长期驻留在state里的顽固记录只有在下次启动程序时删除(因为RocksDB只会加载访问的数据). 这在其它两个backend上不会发生
ttl的配置非常简单
1 | import org.apache.flink.api.common.state.StateTtlConfig; |
注意
flink目前(1.10版本)只支持ProcessingTime的过期, EventTime的ttl会在后续的版本里添加.
uid
任何程序不可避免的会产生迭代, 当业务逻辑改变时, flink如何让旧的state兼容新的程序呢? 这里要分两个部分来讨论
- 应用程序的拓扑改变了
当应用程序从检查点重启时, flink将存储在保存点中的state与应用程序的有状态运算符进行匹配, 这个匹配就是基于操作符的id
来实现的. 每个操作符都有一个默认ID,该ID根据操作符在程序拓扑中的位置来决定. 因此没有修改的程序总是可以从检查点里恢复.
然而默认的id可能会随着程序的改变而改变, 开发者因此为操作符需要明确分配一个id, 只要id不变程序就能正常重启. 分配id很简单:
1 | val mappedEvents: DataStream[(Int, Long)] = events.map(new MyStatefulMapFunc()).uid("mapper-1") |
由于operator ID存放在检查点中, 必须跟程序启动时的id相等, 因此建议给所有将来有可能会升级的操作符分配一个唯一的ID
- 程序的并行度发生改变
对于Operator state
, state不会固定属于某一个子任务, flink在程序重启时采用轮询的方式为子任务分配state
对于Keyed state
, 每个Keyed state逻辑上绑定到一个唯一的<parallel-operator-instance, key>
组合, 一个key只会属于一个唯一的并行实例. 这个组合又被组织成Key Groups
, key group由若干连续的key组成, 是flink重新分发state的原子单位, key group的数量等于程序最大并行度
. 当程序的并行度发生改变, 每个操作符的并行实例分配到的key group随之变化, 但key相对于最高并行度的映射关系没有变, 因此可以任意拓展, 这其实就是一致性hash
的实现.